iT邦幫忙

2022 iThome 鐵人賽

DAY 26
2
DevOps

從零開始的 Jenkins 之旅系列 第 26

第二十六天 Jenkins 之旅: 我的 BI 報表 Pipeline (6)

  • 分享至 

  • xImage
  •  

前言

在這次我們的 source data 為第三方來源(爬蟲),往往因為網站的變動又或是被 ban IP 而無法確定資料有被正確爬取(包含資料筆數、欄位正確性)。這次我們並不會著重在如何優化爬蟲程式,而是重點討論檢驗資料正確性。因此今天主要處理資料品質的問題。

爬蟲程式執行中

在資料由爬蟲程式拿到時,我們可以用 pydantic 進行初步的型態檢測,當 item 拿到的資料無法適配宣告的型態時,會在 console 拋出錯誤(程式不會停止)並同時放棄當前異常的 item 被寫入資料庫。

# src/item.py
class DataAttributeItem(pydantic.BaseModel):
    """data attribute"""
    crawl_datetime: datetime.datetime = datetime.datetime.now()

class IthomeUserInfoItem(DataAttributeItem):
    source: str = "ithome_user_info_item"
    user_id: int
    user_name: str
    ithome_level: str
    ithome_point: int
    user_viewed: int
    user_followed: int
    ask_question: int
    article: int
    answer: int
    invitation_answer: int
    best_answer: int

在爬蟲程式結束時,Scrapy 會產生一個摘要

[scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 171563,
 'downloader/request_count': 471,
 'downloader/request_method_count/GET': 471,
 'downloader/response_bytes': 6115445,
 'downloader/response_count': 471,
 'downloader/response_status_count/200': 471,
 'elapsed_time_seconds': 85.497271,
 'finish_reason': 'finished',
 'finish_time': datetime.datetime(2022, 9, 25, 16, 1, 33, 324185),
 'httpcompression/response_bytes': 24007335,
 'httpcompression/response_count': 471,
 'item_scraped_count': 391,
 'log_count/INFO': 30,
 'memusage/max': 91111424,
 'memusage/startup': 82763776,
 'request_depth_max': 3,
 'response_received_count': 471,
 'scheduler/dequeued': 471,
 'scheduler/dequeued/memory': 471,
 'scheduler/enqueued': 471,
 'scheduler/enqueued/memory': 471,
 'start_time': datetime.datetime(2022, 9, 25, 16, 0, 7, 826914)}

我們可以再訂下規則來判定此次程式是否如預期,例如爬蟲程式正確結束的條件是

可以依照實際需求修改量尺

  1. 訪問的 URL 數量必須大於 1
  2. 被寫入資料庫的資料 (item) 大於 0

則我們可以在 SPIDERMON_SPIDER_CLOSE_MONITORS 新增兩個對應的 monitor 行為

# src/monitor.py
@monitors.name('Item count')
class ItemCountMonitor(Monitor):

    @monitors.name('Minimum request url')
    def test_minimum_page_of_process(self):
        item_extracted = getattr(
            self.data.stats, 'response_received_count', 0)
        minimum_threshold = 1

        self.assertTrue(
            item_extracted >= minimum_threshold, msg=f'Ithome iron page crawl less than {minimum_threshold} page'
        )

    @monitors.name('Minimum number of items')
    def test_minimum_number_of_items(self):
        item_extracted = getattr(
            self.data.stats, 'item_scraped_count', 0)
        minimum_threshold = 0

        self.assertTrue(
            item_extracted >= minimum_threshold, msg=f'Extracted less than {minimum_threshold} items'
        )

class SpiderCloseMonitorSuite(MonitorSuite):

    monitors = [
        ItemCountMonitor,
    ]
    monitors_failed_actions = [
      CloseSpiderAction
    ]

所以當 Scrapy 摘要中的 item_scraped_count 小於 0 或是 response_received_count小於1,則可以在執行狀態中觀察到:

  1. 正常狀態
    https://ithelp.ithome.com.tw/upload/images/20220926/20151613NJRRr3jvEY.png
  2. 異常狀態
    https://ithelp.ithome.com.tw/upload/images/20220926/201516131ebZDXTqXA.png

再來,可以藉由 monitors_failed_actions 做出 Action (如向 Slack 發出告警)

特別注意, monitor 項目沒有通過並不會讓程式的 return code 產生異常 ( > 0)

https://ithelp.ithome.com.tw/upload/images/20220926/20151613efOs1g5FEe.png

在寫入 Prodution table 前

當我們將資料從 MongoDB 儲存到本地的 csv 後,我們會先確認 csv 的資料筆數與 MongoDB 的資料筆數一致

stage("Check mongo data"){
    steps{
        sh  '''
        MONGO_DATA_COUNT=$(python3 mongo_client.py -c ${DATA} count-data --contain-header)
        CSV_DATA_COUNT=$(cat output/${DATA}/${DATA}.csv|wc -l)
        echo "Mongo data count: ${MONGO_DATA_COUNT}"
        echo "CSV data count: ${CSV_DATA_COUNT}"
        if [ $CSV_DATA_COUNT != $CSV_DATA_COUNT ]; then exit 1; fi
        '''
    }
}

在確認筆數一致後,我們接著來檢測資料品質 (Data Quality)。此次會使用 InfuseAI 開源的 Data Quality Tool - PipeRider

PipeRider 官方網站
PipeRider Github
PipeRider 文件

我們可以簡單由 pip install piperider 即可進行 PipeRider 的安裝。

在一開始時,我們需要初始化 PipeRider 專案,依序宣吿

  1. 資料來源名稱
  2. 資料型態 (bigquery、postgres、csv 等等......)
  3. 由於我們的資料為 csv,所以需要宣告 csv 檔案位置 (因為我們會將 piperider 串到 Jenkins 中,所以建議以相對路徑進行宣告)

在完成設定檔的宣告後,會自動產生在當前路徑下生成 .piperider 資料夾,並且會依據方才的宣告內容產生對應的 config.yaml

piperider init

https://ithelp.ithome.com.tw/upload/images/20220926/201516137R1jLKGC7f.png

# .piperider/config.yaml

dataSources:
- name: demo
  type: csv
  path: ../content_info/content_info.csv

# profiler:
#   table:
#     # the maximum row count to profile. (Default unlimited)
#     limit: 1000000

# The tables to include/exclude
# includes: []
# excludes: []

# tables:
#   my-table-name:
#     # description of the table
#     description: "this is a table description"
#     columns:
#       my-col-name:
#         # description of the column
#         description: "this is a column description"

telemetry:
  id: 9f9dd57ab43c4792a618baa15047f953

再來我們來確認設定檔的正確性,以及資料來源的連線測試

piperider diagnose

https://ithelp.ithome.com.tw/upload/images/20220926/20151613YuVCfSwjbg.png

接著,我們可以開始檢測我們的資料品質

piperider run

https://ithelp.ithome.com.tw/upload/images/20220926/201516138HOVnp8TVB.png

更多細部參數調整可以看官網介紹

在執行結束後,PipeRider 會產生一個分析報告(靜態 HTML)

https://ithelp.ithome.com.tw/upload/images/20220926/20151613W7BwMrb1yl.png

這邊我們來假設一種錯誤情況: 在 recommended_content_info.yaml 的測試項目下,_id 的資料應為唯一值,所以我們故意讓其中兩筆的 _id 重複 20152012-5491-10298343

# .piperider/assertions/recommended_content_info.yaml
columns:
_id:  # Column Name
  # Test Cases for Column
  tests:
  - name: assert_column_schema_type
    assert:
      schema_type: VARCHAR
    tags:
    - RECOMMENDED
  - name: assert_column_unique
    tags:
    - RECOMMENDED
  - name: assert_column_not_null
    tags:
    - RECOMMENDED

https://ithelp.ithome.com.tw/upload/images/20220926/20151613Ks2ldCdZoS.png

讓我們再次執行 piperider run 進行資料測試,這時我們可以在 Terminal 看到 content_info._id 的測試項目會出現 [Failed]
https://ithelp.ithome.com.tw/upload/images/20220926/20151613v9TDDcmlQC.png

目前並未看到 PipeRider 支援可以在 Terminal 執行下拋出異常訊號(return code != 0)用以判斷測試結果是否異常。但是我們可以觀察到 PipeRider 的測試相關結果資料均會寫在 .piperider/outputs/latest/run.json 中,所以可以利用一個 python script 用以解析 run.json 內容,幫助我們判斷測試結果狀態。
https://ithelp.ithome.com.tw/upload/images/20220926/20151613pqzbvbRGf2.png

@click.command()
@click.option('--run-json-file', type=str, default='')
@click.option('--data-source-name', type=str, required=True)
def main(data_source_name: str, run_json_file: str) -> None:
    if not run_json_file: 
        run_json_file = f'./output/{data_source_name}/.piperider/outputs/latest/run.json'
    run_json_data = json.load(open(run_json_file))
    test_result = run_json_data['tables'][data_source_name]['piperider_assertion_result']['columns']
    TEST_RESULT='succeed'
    error_msg_list = []

    for col, result in test_result.items():
        for test_item in result:
            if test_item['status'] != 'passed':
                error_msg_list.append(f'{col}: {test_item}')
                TEST_RESULT='failed'

    if TEST_RESULT == 'failed':
        raise AssertionError('\n'.join(error_msg_list))

if __name__ == "__main__":
    main()

下面為 Jenkinsfile 串接 PipeRider 的範例 stage 寫法

DATA = [content_info, user_info]
請參考昨日 matrix 寫法一起服用

stage("Check data quality"){
    steps{
        sh """
        cd output/${DATA} && piperider run
        """
        
        sh "python3 get_piperider_result.py ----data-source-name ${DATA}"
    }
}
......
post {
    always{
        archiveArtifacts artifacts: 'output/**', followSymlinks: false
    }
} 
......

https://ithelp.ithome.com.tw/upload/images/20220926/20151613Hc2TaSpCWd.png

小結

今天我們就在爬蟲程式 (Scrapy) 、Local 與 PipeRider 中進行不同面向的資料品質檢測。
Scrapy 是一個 python 的爬蟲框架,上手起來可能沒那麼容易,但是功能強大適合在 Production 場景使用,所以非常推薦大家去看看
PipeRider 是一個 Python Base 的 Data Quality 工具 ,操作簡易也可以簡單用 yaml 擴充測試項目、Terminal 介面做的非常 fancy~ 非常推薦大家用來玩玩看,來更了解自己的資料。

參考資料

https://docs.scrapy.org/en/latest/intro/tutorial.html
https://spidermon.readthedocs.io/en/latest/monitors.html
https://github.com/scrapinghub/spidermon/blob/master/examples/tutorial/tutorial/actions.py
https://www.piperider.io
https://blog.infuseai.io/adding-data-observability-and-alerts-to-your-data-pipeline-is-easier-than-you-think-4e005daca55b


上一篇
第二十五天 Jenkins 之旅: 我的 BI 報表 Pipeline (5)
下一篇
第二十七天 Jenkins 之旅: 我的 BI 報表 Pipeline (7)
系列文
從零開始的 Jenkins 之旅30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言